I. Thống kê cơ bản (Basic Statistics)

1. Tương quan (Correlation)

Việc tính toán tương quan giữa hai chuỗi dữ liệu là một thao tác phổ biến trong thống kê. Trong spark.ml, bạn có thể tính toán ma trận tương quan cặp giữa nhiều chuỗi dữ liệu. Các phương pháp tương quan được hỗ trợ hiện tại bao gồm tương quan Pearson và Spearman.

Hàm Correlation.corr tính toán ma trận tương quan cho tập dữ liệu đầu vào chứa các vector, sử dụng phương pháp được chỉ định. Kết quả trả về là một DataFrame chứa ma trận tương quan của cột vector.

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
  
# Khởi tạo SparkSession
spark = SparkSession.builder.appName("CorrelationExample").getOrCreate()  

# Dữ liệu mẫu
data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),

        (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),

        (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),

        (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]


# Tạo DataFrame
df = spark.createDataFrame(data, ["features"])


# Tính hệ số tương quan Pearson
r1 = Correlation.corr(df, "features").head()
print("Ma trận tương quan Pearson:\n", r1[0])

  
# Tính hệ số tương quan Spearman
r2 = Correlation.corr(df, "features", "spearman").head()
print("Ma trận tương quan Spearman:\n", r2[0])

Pasted image 20250320130727.png
Pasted image 20250320130758.png

2. Kiểm định giả thuyết (Hypothesis Testing)

MLlib cung cấp các công cụ để thực hiện kiểm định giả thuyết, giúp xác định xem một giả thuyết thống kê có thể được chấp nhận hay không dựa trên dữ liệu mẫu.

Kiểm định Chi-Square (ChiSquareTest)

Kiểm định Chi-Square được sử dụng để kiểm tra sự độc lập giữa các biến hoặc để kiểm tra sự phù hợp của phân phối quan sát với phân phối mong đợi.

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest
  
# Khởi tạo SparkSession
spark = SparkSession.builder.appName("ChiSquareTestExample").getOrCreate()

data = [(0.0, Vectors.dense(0.5, 10.0)),
        (0.0, Vectors.dense(1.5, 20.0)),
        (1.0, Vectors.dense(1.5, 30.0)),
        (0.0, Vectors.dense(3.5, 30.0)),
        (0.0, Vectors.dense(3.5, 40.0)),
        (1.0, Vectors.dense(3.5, 40.0))]

df = spark.createDataFrame(data, ["label", "features"])
  
r = ChiSquareTest.test(df, "features", "label").head()
  

print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

Pasted image 20250320131327.png

3. Tóm tắt thống kê (Summarizer)

MLlib cung cấp các công cụ để tính toán các thống kê tóm tắt như trung bình, phương sai, min, max, và norm L2 cho các vector đầu vào.

from pyspark.sql import SparkSession
from pyspark.ml.stat import Summarizer
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

# Khởi tạo SparkSession
spark = SparkSession.builder.appName("ChiSquareTestExample").getOrCreate()

  
df = spark.createDataFrame([
      Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
      Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))])
  

# create summarizer for multiple metrics "mean" and "count"
summarizer = Summarizer.metrics("mean", "count")

# Tính toán thống kê với trọng số
df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)

# Tính toán thống kê không có trọng số
df.select(summarizer.summary(df.features)).show(truncate=False)

# Tính toán thống kê cho một metric đơn lẻ ("mean") có trọng số
df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)

# Tính toán thống kê cho một metric đơn lẻ ("mean") không có trọng số
df.select(Summarizer.mean(df.features)).show(truncate=False)
  • Tính toán thống kê với trọng sốPasted image 20250320132550.png
  • Tính toán thống kê không có trọng số Pasted image 20250320132604.png
  • Tính toán thống kê cho một metric đơn lẻ ("mean") có trọng số Pasted image 20250320132654.png
  • Tính toán thống kê cho một metric đơn lẻ ("mean") không có trọng số Pasted image 20250320132729.png

II. Data Sources trong Spark ML

Trong MLlib, dữ liệu có thể đến từ nhiều nguồn khác nhau. Spark ML hỗ trợ đọc và ghi dữ liệu từ nhiều định dạng phổ biến như Parquet, JSON, CSV, LIBSVM, giúp dễ dàng thao tác với dữ liệu máy học.

1. Image Data Source

Nguồn dữ liệu hình ảnh được sử dụng để tải ảnh từ một thư mục. Nó có thể đọc các tệp ảnh nén (JPEG, PNG, v.v.) và chuyển đổi chúng thành định dạng ảnh thô bằng thư viện ImageIO của Java.
Khi dữ liệu được tải vào Spark, nó sẽ được lưu trong một DataFrame với một cột duy nhất có kiểu StructType mang tên "image", chứa dữ liệu ảnh theo định dạng schema ảnh.
Cấu trúc của cột "image" gồm các trường sau:

  • origin (StringType): Đường dẫn đến tệp ảnh.
  • height (IntegerType): Chiều cao của ảnh (tính theo pixel).
  • width (IntegerType): Chiều rộng của ảnh (tính theo pixel).
  • nChannels (IntegerType): Số lượng kênh màu trong ảnh (ví dụ: 1 = grayscale, 3 = RGB).
  • mode (IntegerType): Kiểu dữ liệu ảnh (theo định dạng tương thích OpenCV).
  • data (BinaryType): Dữ liệu ảnh dưới dạng byte, được lưu theo thứ tự tương thích với OpenCV (thường là BGR theo hàng).

Tổ chức thư mục: Pasted image 20250320141747.png

from pyspark.sql import SparkSession

  
# Tạo SparkSession
spark = SparkSession.builder.appName("ImageDataExample").getOrCreate()
  
# Đọc dữ liệu hình ảnh
image_df = spark.read.format("image").load("file:///home/hadoopvinhtuong/MLib/Data_sources/images")
# file:// là để đọc ảnh được lưu trữ local
# hdfs:// là để đọc ảnh được lưu trữ trên hadoop


# Hiển thị dữ liệu
image_df.select("image.origin", "image.width", "image.height").show(truncate=False)

Pasted image 20250320141409.png

2. LIBSVM data source

LIBSVM là định dạng phổ biến trong học máy, thường chứa label (nhãn) và features (vector đặc trưng).
Dữ liệu sau khi được tải vào Spark DataFrame sẽ có hai cột chính:

  • label: Chứa nhãn của từng mẫu dữ liệu, được lưu dưới dạng số thực (Double).
  • features: Chứa vector đặc trưng của mẫu, được lưu dưới dạng Vector (kiểu dữ liệu VectorUDT trong Spark ML).

Tổ chức thư mục: Pasted image 20250320143008.png

from pyspark.sql import SparkSession

# Tạo SparkSession
spark = SparkSession.builder.appName("ImageDataExample").getOrCreate(

# Đọc dữ liệu hình ảnh
df = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

df.show(10)

Pasted image 20250320142919.png


III. ML Pipelines

ML Pipelines cung cấp một tập hợp các API cấp cao, thống nhất, được xây dựng trên nền tảng DataFrames, giúp người dùng tạo và tinh chỉnh các pipeline học máy thực tiễn.
Các khái niệm chính trong Pipelines:

  • DataFrame: API ML sử dụng DataFrame từ Spark SQL làm tập dữ liệu ML, có thể chứa nhiều loại dữ liệu khác nhau như văn bản, vector đặc trưng, nhãn thực và dự đoán.
  • Transformer: Là một thuật toán có thể chuyển đổi một DataFrame thành một DataFrame khác. Ví dụ, một mô hình ML là một Transformer chuyển đổi một DataFrame với các đặc trưng thành một DataFrame với các dự đoán.
  • Estimator: Là một thuật toán có thể được huấn luyện trên một DataFrame để tạo ra một Transformer. Ví dụ, một thuật toán học là một Estimator được huấn luyện trên một DataFrame và tạo ra một mô hình.
  • Pipeline: Kết hợp nhiều Transformer và Estimator lại với nhau để xác định một workflow ML.
  • Parameter: Tất cả các Transformer và Estimator hiện chia sẻ một API chung để xác định các tham số.

1. DataFrame

Machine learning có thể được áp dụng cho nhiều loại dữ liệu khác nhau, chẳng hạn như vector, văn bản, hình ảnh và dữ liệu có cấu trúc. Để hỗ trợ sự đa dạng này, API của Spark ML sử dụng DataFrame từ Spark SQL.

DataFrame hỗ trợ nhiều kiểu dữ liệu cơ bản và có cấu trúc. Ngoài ra, DataFrame cũng có thể chứa các kiểu vector đặc trưng của ML.

Có thể tạo DataFrame từ một RDD theo cách ngầm định hoặc tường minh.

Trong DataFrame, các cột được đặt tên cụ thể. Ví dụ trong các đoạn code ví dụ, ta có các cột như "text", "features""label".

2. Các thành phần của Pipeline

2.1) Transformer

Transformer là một khái niệm trừu tượng bao gồm cả các bộ biến đổi đặc trưng (feature transformers) và các mô hình đã học. Về mặt kỹ thuật, Transformer triển khai phương thức transform(), có nhiệm vụ chuyển đổi một DataFrame thành một DataFrame khác bằng cách thêm một hoặc nhiều cột. Ví dụ:

  • Một bộ biến đổi đặc trưng có thể đọc một cột (ví dụ: văn bản), chuyển đổi nó thành vector đặc trưng và thêm cột mới vào DataFrame.
  • Một mô hình học máy có thể đọc cột chứa vector đặc trưng, dự đoán nhãn cho từng vector và thêm cột dự đoán vào DataFrame.

2.2) Estimator

Estimator là một khái niệm trừu tượng đại diện cho thuật toán học hoặc bất kỳ thuật toán nào cần huấn luyện trên dữ liệu. Về mặt kỹ thuật, Estimator triển khai phương thức fit(), nhận vào một DataFrame và tạo ra một Model (là một Transformer).
Ví dụ, thuật toán Logistic Regression là một Estimator. Khi gọi fit(), nó sẽ huấn luyện và tạo ra một LogisticRegressionModel, vốn là một Model và cũng là một Transformer.

2.3) Tính chất của các thành phần trong Pipeline

  • Transformer.transform()Estimator.fit() đều không có trạng thái (stateless), tức là chúng không lưu trạng thái giữa các lần gọi.
  • Mỗi Transformer hoặc Estimator đều có một ID duy nhất, giúp định danh và thiết lập các tham số.

2.4) Pipeline là gì?

Trong machine learning, chúng ta thường cần chạy một chuỗi thuật toán để xử lý và học từ dữ liệu. Ví dụ, quy trình xử lý văn bản có thể bao gồm:

  1. Tách văn bản thành các từ.
  2. Chuyển đổi các từ thành vector đặc trưng số.
  3. Huấn luyện mô hình dự đoán dựa trên các vector đặc trưng và nhãn.

Spark ML biểu diễn quy trình này dưới dạng Pipeline, gồm một chuỗi các PipelineStages (có thể là Transformer hoặc Estimator) thực hiện theo một thứ tự nhất định.

3. Pipeline hoạt động như thế nào?

Một Pipeline bao gồm nhiều giai đoạn (stages), mỗi giai đoạn là một Transformer hoặc Estimator. Khi thực thi Pipeline:

  1. Nếu một giai đoạn là Estimator, phương thức fit() sẽ được gọi để huấn luyện mô hình. Sau đó, mô hình này (một Transformer) sẽ được sử dụng để chuyển đổi DataFrame trước khi chuyển tiếp sang giai đoạn tiếp theo.
  2. Nếu một giai đoạn là Transformer, phương thức transform() của nó sẽ được gọi trên DataFrame.

3.1 Estimator

Giai đoạn Huấn luyện (Training Time)

  • Pipeline được thiết lập như một chuỗi các giai đoạn (stages), mỗi giai đoạn có thể là Transformer hoặc Estimator.
  • Dữ liệu đầu vào ban đầu là một DataFrame chứa văn bản thô và nhãn tương ứng.

Các bước thực hiện:

  1. Tokenizer (Transformer): Chuyển đổi văn bản thô thành danh sách từ. Kết quả là một DataFrame mới có cột chứa các từ đã tách.
  2. HashingTF (Transformer): Chuyển đổi danh sách từ thành vector đặc trưng. Kết quả là một DataFrame mới có cột chứa vector đặc trưng.
  3. Logistic Regression (Estimator): Đây là một Estimator, nên nó sẽ được huấn luyện trên dữ liệu vector đặc trưng và nhãn. Sau khi gọi fit(), nó tạo ra một LogisticRegressionModel.
  • Sau khi fit() chạy, Pipeline tạo ra một PipelineModel, trong đó các Estimator đã trở thành Transformer.
    Pasted image 20250320143630.png

3.2 Transformer

Giai đoạn Dự đoán (Test Time)

  • PipelineModel được sử dụng để xử lý dữ liệu mới với cùng một quy trình như khi huấn luyện.
  • Lúc này, tất cả các Estimator trong Pipeline ban đầu đã trở thành Transformer trong PipelineModel.

Các bước dự đoán:

  1. Tokenizer.transform(): Tách văn bản mới thành danh sách từ.
  2. HashingTF.transform(): Chuyển danh sách từ thành vector đặc trưng.
  3. LogisticRegressionModel.transform(): Sử dụng mô hình đã huấn luyện để dự đoán nhãn cho dữ liệu mới.

Kết quả cuối cùng là một DataFrame với cột Predictions, chứa nhãn dự đoán cho dữ liệu mới.
Pasted image 20250320145915.png

3.3 Tóm tắt

  • Pipeline ban đầu là một Estimator, chứa cả TransformerEstimator.
  • Khi gọi fit(), Pipeline tạo ra một PipelineModel, trong đó tất cả Estimator đã trở thành Transformer.
  • Khi gọi PipelineModel.transform(), dữ liệu đi qua toàn bộ quy trình tương tự như lúc huấn luyện để đảm bảo tính nhất quán giữa tập huấn luyện và tập thử nghiệm.

Pipeline giúp tự động hóa quy trình tiền xử lý và huấn luyện, giúp mã nguồn trở nên gọn gàng và dễ bảo trì hơn

4. Các chi tiết bổ sung

  • DAG Pipelines: Một Pipeline có thể có cấu trúc đồ thị có hướng không chu trình (DAG), trong đó các giai đoạn có thể không tuần tự mà có thể có nhiều đường rẽ nhánh. Việc tổ chức DAG hiện tại dựa trên tên cột đầu vào và đầu ra.
  • Kiểm tra dữ liệu tại thời gian chạy: Vì Pipeline hoạt động trên nhiều loại DataFrame khác nhau, nó sử dụng kiểm tra kiểu dữ liệu động (runtime type checking) thay vì kiểm tra lúc biên dịch.
  • Các giai đoạn trong Pipeline phải là duy nhất: Mỗi giai đoạn phải là một đối tượng riêng biệt. Ví dụ, không thể sử dụng cùng một đối tượng myHashingTF hai lần trong một Pipeline, nhưng có thể sử dụng hai đối tượng myHashingTF1myHashingTF2 riêng biệt.

5.Thiết lập tham số trong ML Pipelines

Spark ML cung cấp một API thống nhất để thiết lập tham số:

  • Mỗi tham số (Param) đều có một tên cụ thể và mô tả.
  • Một ParamMap là tập hợp các cặp (tham số, giá trị), dùng để truyền tham số vào Estimator hoặc Transformer.

Có hai cách để đặt tham số:

  1. Đặt trực tiếp trên đối tượng
lr.setMaxIter(10)  # Đặt số lần lặp tối đa là 10
  1. Sử dụng ParamMap khi gọi fit() hoặc transform()
paramMap = ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)

6.Lưu và tải Pipelines

Trong thực tế, việc lưu mô hình hoặc Pipeline để sử dụng sau này rất quan trọng. Spark hỗ trợ lưu trữ và tải lại mô hình từ Spark 1.6. Từ Spark 2.3 trở đi, API DataFrame-based trong spark.ml hỗ trợ đầy đủ việc lưu trữ.

  • Spark hỗ trợ lưu trữ mô hình giữa các ngôn ngữ (Scala, Java, Python).
  • Riêng R hiện có định dạng khác và chỉ có thể tải lại trong R (điều này có thể thay đổi trong tương lai).

7. Tương thích ngược trong ML Pipelines

Spark ML đảm bảo tương thích ngược cho các Pipeline:

  • Giữa các phiên bản nhỏ và bản vá (minor & patch versions): Đảm bảo tương thích.
  • Giữa các phiên bản lớn (major versions): Không đảm bảo nhưng sẽ cố gắng duy trì.
  • Hành vi mô hình: Trong các bản cập nhật nhỏ, hành vi mô hình giữ nguyên (trừ khi có lỗi được sửa).

8. Code Example

8.1 Estimator, Transformer, and Param

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression 
  
spark = SparkSession.builder.master("local").appName("Example").getOrCreate()


# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([

    (1.0, Vectors.dense([0.0, 1.1, 0.1])),

    (0.0, Vectors.dense([2.0, 1.0, -1.0])),

    (0.0, Vectors.dense([2.0, 1.3, 1.0])),

    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])


# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)

# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())

# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.

# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # type: ignore

# You can combine paramMaps, which are python dictionaries.
# Change output column name
paramMap2 = {lr.probabilityCol: "myProbability"}
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)  # type: ignore

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.

model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())

# Prepare test data
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
  
# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()
    
for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
          % (row.features, row.label, row.myProbability, row.prediction))

LogisticRegression parameters: Pasted image 20250320152058.png

Model 1 was fit using parameters: Pasted image 20250320152227.png

Model 2 was fit using parameters: Pasted image 20250320152404.png

Test: Pasted image 20250320152608.png

8.2 Pipeline

from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer

spark = SparkSession.builder.master("local").appName("Example").getOrCreate()
  
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
 
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)
 

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")

for row in selected.collect():
    rid, text, prob, prediction = row
    print(
        "(%d, %s) --> prob=%s, prediction=%f" % (
            rid, text, str(prob), prediction   # type: ignore
        )
    )

kết quả: Pasted image 20250320153214.png


IV. Các phương pháp trích xuất, chuyển đổi và lựa chọn đặc trưng (Extracting, transforming and selecting features)

1. Trích xuất đặc trưng (Feature Extractors)

TF-IDF (Term frequency-inverse document frequency)

Phương pháp vector hóa đặc trưng phổ biến trong khai thác văn bản, phản ánh tầm quan trọng của một từ trong tài liệu thuộc tập hợp.

from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF

spark = SparkSession.builder.appName("TF-IDF Example").getOrCreate()

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])


tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()

Output: Pasted image 20250320155353.png


Word2Vec

Mô hình tính toán biểu diễn vector phân tán của từ, giúp các từ tương tự gần nhau trong không gian vector, hỗ trợ trong các ứng dụng xử lý ngôn ngữ tự nhiên.

from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec

spark = SparkSession.builder.appName("Word2Vec Example").getOrCreate()

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)

for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Output: Pasted image 20250320155937.png


CountVectorizer

Chuyển đổi tập hợp tài liệu văn bản thành vector số lượng từ, hữu ích trong việc xây dựng các mô hình học máy.

from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer

spark = SparkSession.builder.appName("CountVectorizer Example").getOrCreate()

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

Output: Pasted image 20250320160445.png


FeatureHasher

Sử dụng hàm băm để chuyển đổi các đặc trưng thành vector số, giúp giảm kích thước không gian đặc trưng.

from pyspark.sql import SparkSession
from pyspark.ml.feature import FeatureHasher


spark = SparkSession.builder.appName("FeatureHasher Example").getOrCreate()


dataset = spark.createDataFrame([
    (2.2, True, "1", "foo"),
    (3.3, False, "2", "bar"),
    (4.4, False, "3", "baz"),
    (5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])

hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
                       outputCol="features")
featurized = hasher.transform(dataset)
featurized.show(truncate=False)

Output: Pasted image 20250320160901.png

2. Chuyển đổi đặc trưng (Feature Transformers)

Tokenizer

Chia văn bản thành các từ hoặc câu dựa trên dấu cách hoặc dấu câu

from pyspark.sql import SparkSession

from pyspark.ml.feature import Tokenizer, RegexTokenizer

from pyspark.sql.functions import col, udf

from pyspark.sql.types import IntegerType


spark = SparkSession.builder.appName("Tokenizer Example").getOrCreate()


sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])


tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")

# alternatively, pattern="\\w+", gaps(False)
countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)

tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

Output: Pasted image 20250320161244.png
Pasted image 20250320161246.png

StopWordsRemover

Loại bỏ các từ dừng (như "và", "hoặc"). Từ dừng là những từ cần loại trừ khỏi dữ liệu đầu vào, thường là do các từ này xuất hiện thường xuyên và không mang nhiều ý nghĩa

from pyspark.sql import SparkSession

from pyspark.ml.feature import StopWordsRemover

  

spark = SparkSession.builder.appName("StopWordsRemover Example").getOrCreate()

  

sentenceData = spark.createDataFrame([

    (0, ["I", "saw", "the", "red", "balloon"]),

    (1, ["Mary", "had", "a", "little", "lamb"])

], ["id", "raw"])

  

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")

remover.transform(sentenceData).show(truncate=False)

Output: Pasted image 20250320163241.png

N-gram

Tạo ra các chuỗi liên tiếp của 'n' từ từ một văn bản, giúp mô hình hiểu ngữ cảnh tốt hơn.

from pyspark.sql import SparkSession

from pyspark.ml.feature import NGram

  

spark = SparkSession.builder.appName("NGram Example").getOrCreate()

  
  

wordDataFrame = spark.createDataFrame([

    (0, ["Hi", "I", "heard", "about", "Spark"]),

    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),

    (2, ["Logistic", "regression", "models", "are", "neat"])

], ["id", "words"])

  

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

  

ngramDataFrame = ngram.transform(wordDataFrame)

ngramDataFrame.select("ngrams").show(truncate=False)

Output: Pasted image 20250320163539.png

Binarizer

Chuyển đổi các giá trị số thành 0 hoặc 1 dựa trên ngưỡng xác định.

from pyspark.sql import SparkSession

from pyspark.ml.feature import Binarizer

  

spark = SparkSession.builder.appName("Binarizer Example").getOrCreate()

  
  

continuousDataFrame = spark.createDataFrame([

    (0, 0.1),

    (1, 0.8),

    (2, 0.2)

], ["id", "feature"])

  

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

  

binarizedDataFrame = binarizer.transform(continuousDataFrame)

  

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())

binarizedDataFrame.show()

Pasted image 20250320163737.png

PCA (Phân tích thành phần chính)

Giảm số lượng đặc trưng bằng cách chuyển đổi chúng thành tập hợp các thành phần chính.

from pyspark.sql import SparkSession

from pyspark.ml.feature import PCA

from pyspark.ml.linalg import Vectors

  

spark = SparkSession.builder.appName("PCA Example").getOrCreate()

  
  
  

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),

        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),

        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]

df = spark.createDataFrame(data, ["features"])

  

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")

model = pca.fit(df)

  

result = model.transform(df).select("pcaFeatures")

result.show(truncate=False)

Output: Pasted image 20250320163935.png

PolynomialExpansion

Tạo ra các đặc trưng đa thức từ các đặc trưng ban đầu, mở rộng không gian đặc trưng.

from pyspark.sql import SparkSession

from pyspark.ml.feature import PolynomialExpansion

from pyspark.ml.linalg import Vectors

  

spark = SparkSession.builder.appName("PolynomialExpansion Example").getOrCreate()

  

df = spark.createDataFrame([

    (Vectors.dense([2.0, 1.0]),),

    (Vectors.dense([0.0, 0.0]),),

    (Vectors.dense([3.0, -1.0]),)

], ["features"])

  

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")

polyDF = polyExpansion.transform(df)

  

polyDF.show(truncate=False)

Output: Pasted image 20250320164223.png

DCT (Biến đổi cosine rời rạc)

Chuyển đổi dữ liệu từ miền thời gian sang miền tần số, hữu ích trong xử lý tín hiệu.

from pyspark.sql import SparkSession

from pyspark.ml.feature import DCT

from pyspark.ml.linalg import Vectors

  

spark = SparkSession.builder.appName("DCT Example").getOrCreate()

  
  

df = spark.createDataFrame([

    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),

    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),

    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

  

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

  

dctDf = dct.transform(df)

  

dctDf.select("featuresDCT").show(truncate=False)

output: Pasted image 20250320164425.png

StringIndexer:

Chuyển đổi các giá trị chuỗi thành số nguyên, giúp mô hình học máy xử lý dễ dàng hơn.

from pyspark.sql import SparkSession

from pyspark.ml.feature import StringIndexer

  

spark = SparkSession.builder.appName("StringIndexer Example").getOrCreate()

  

df = spark.createDataFrame(

    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],

    ["id", "category"])

  

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")

indexed = indexer.fit(df).transform(df)

indexed.show()

Output: Pasted image 20250320164639.png

IndexToString

Chuyển đổi các chỉ số số nguyên trở lại giá trị chuỗi ban đầu.

from pyspark.sql import SparkSession

from pyspark.ml.feature import IndexToString, StringIndexer

  

spark = SparkSession.builder.appName("IndexToString Example").getOrCreate()

  

df = spark.createDataFrame(

    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],

    ["id", "category"])

  

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")

model = indexer.fit(df)

indexed = model.transform(df)

  

print("Transformed string column '%s' to indexed column '%s'"

      % (indexer.getInputCol(), indexer.getOutputCol()))

indexed.show()

  

print("StringIndexer will store labels in output column metadata\n")

  

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")

converted = converter.transform(indexed)

  

print("Transformed indexed column '%s' back to original string column '%s' using "

      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))

converted.select("id", "categoryIndex", "originalCategory").show()

Output: Pasted image 20250320165001.png
Pasted image 20250320165018.png

OneHotEncoder

Mã hóa các giá trị phân loại thành vector nhị phân, giúp mô hình hiểu rõ hơn về các danh mục.

from pyspark.sql import SparkSession

from pyspark.ml.feature import OneHotEncoder

  

spark = SparkSession.builder.appName("OneHotEncoder Example").getOrCreate()

  

df = spark.createDataFrame([

    (0.0, 1.0),

    (1.0, 0.0),

    (2.0, 1.0),

    (0.0, 2.0),

    (0.0, 1.0),

    (2.0, 0.0)

], ["categoryIndex1", "categoryIndex2"])

  

encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],

                        outputCols=["categoryVec1", "categoryVec2"])

model = encoder.fit(df)

encoded = model.transform(df)

encoded.show()

Output: Pasted image 20250320165235.png

VectorIndexer

Tự động xác định và mã hóa các đặc trưng phân loại trong vector, giảm công sức tiền xử lý.

[from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorIndexer

  

spark = SparkSession.builder.appName("OneHotEncoder Example").getOrCreate()

  

data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

  
  

indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)

indexerModel = indexer.fit(data)

  

categoricalFeatures = indexerModel.categoryMaps

print("Chose %d categorical features: %s" %

      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

  

# Create new column "indexed" with categorical values transformed to indices

indexedData = indexerModel.transform(data)

indexedData.show()](<from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorIndexer

spark = SparkSession.builder.appName("VectorIndexer Example").getOrCreate()

data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")


indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()>)

Output: Pasted image 20250320165447.png

Interaction

Tạo ra các đặc trưng tương tác bằng cách kết hợp các đặc trưng hiện có.

from pyspark.sql import SparkSession

from pyspark.ml.feature import Interaction, VectorAssembler

  
  

spark = SparkSession.builder.appName("Interaction Example").getOrCreate()

  

df = spark.createDataFrame(

    [(1, 1, 2, 3, 8, 4, 5),

     (2, 4, 3, 8, 7, 9, 8),

     (3, 6, 1, 9, 2, 3, 6),

     (4, 10, 8, 6, 9, 4, 5),

     (5, 9, 2, 7, 10, 7, 3),

     (6, 1, 1, 4, 2, 8, 4)],

    ["id1", "id2", "id3", "id4", "id5", "id6", "id7"])

  

assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")

  

assembled1 = assembler1.transform(df)

  

assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")

  

assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

  

interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")

  

interacted = interaction.transform(assembled2)

  

interacted.show(truncate=False)

Output: Pasted image 20250320170120.png

Normalizer

Chuẩn hóa các vector đặc trưng về độ dài 1, giúp mô hình không bị ảnh hưởng bởi độ lớn của đặc trưng.

from pyspark.sql import SparkSession

from pyspark.ml.feature import Normalizer

from pyspark.ml.linalg import Vectors

  

spark = SparkSession.builder.appName("Normalizer Example").getOrCreate()

  

dataFrame = spark.createDataFrame([

    (0, Vectors.dense([1.0, 0.5, -1.0]),),

    (1, Vectors.dense([2.0, 1.0, 1.0]),),

    (2, Vectors.dense([4.0, 10.0, 2.0]),)

], ["id", "features"])

  

# Normalize each Vector using $L^1$ norm.

normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)

l1NormData = normalizer.transform(dataFrame)

print("Normalized using L^1 norm")

l1NormData.show()

  

# Normalize each Vector using $L^\infty$ norm.

lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})

print("Normalized using L^inf norm")

lInfNormData.show()

Output: Pasted image 20250320170735.png

StandardScaler

Chuẩn hóa các đặc trưng để có trung bình bằng 0 và độ lệch chuẩn bằng 1, giúp mô hình học hiệu quả hơn

from pyspark.sql import SparkSession

from pyspark.ml.feature import StandardScaler

  
  

spark = SparkSession.builder.appName("StandardScaler Example").getOrCreate()

  

dataFrame = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

  
  

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",

                        withStd=True, withMean=False)

  

# Compute summary statistics by fitting the StandardScaler

scalerModel = scaler.fit(dataFrame)

  

# Normalize each feature to have unit standard deviation.

scaledData = scalerModel.transform(dataFrame)

scaledData.show()

Output: Pasted image 20250320171040.png

RobustScaler

Sử dụng các thống kê bền vững để chuẩn hóa đặc trưng, giảm ảnh hưởng của ngoại lệ.

from pyspark.sql import SparkSession

from pyspark.ml.feature import RobustScaler

  
  

spark = SparkSession.builder.appName("RobustScaler Example").getOrCreate()

  

dataFrame = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

  

scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",

                      withScaling=True, withCentering=False,

                      lower=0.25, upper=0.75)

  

# Compute summary statistics by fitting the RobustScaler

scalerModel = scaler.fit(dataFrame)

  

# Transform each feature to have unit quantile range.

scaledData = scalerModel.transform(dataFrame)

scaledData.show()

Output: Pasted image 20250320171251.png

MinMaxScaler

Tỉ lệ hóa các đặc trưng về khoảng giá trị [0, 1], giúp đồng nhất các đặc trưng.

from pyspark.sql import SparkSession

from pyspark.ml.feature import MinMaxScaler

from pyspark.ml.linalg import Vectors

  
  

spark = SparkSession.builder.appName("MinMaxScaler Example").getOrCreate()

  
  

dataFrame = spark.createDataFrame([

    (0, Vectors.dense([1.0, 0.1, -1.0]),),

    (1, Vectors.dense([2.0, 1.1, 1.0]),),

    (2, Vectors.dense([3.0, 10.1, 3.0]),)

], ["id", "features"])

  

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

  

# Compute summary statistics and generate MinMaxScalerModel

scalerModel = scaler.fit(dataFrame)

  

# rescale each feature to range [min, max].

scaledData = scalerModel.transform(dataFrame)

print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

scaledData.select("features", "scaledFeatures").show()

Output: Pasted image 20250320171442.png

MaxAbsScaler

Tỉ lệ hóa các đặc trưng dựa trên giá trị tuyệt đối lớn nhất, giữ nguyên dấu của đặc trưng.

from pyspark.sql import SparkSession

from pyspark.ml.feature import MaxAbsScaler

from pyspark.ml.linalg import Vectors

  

spark = SparkSession.builder.appName("MaxAbsScaler Example").getOrCreate()

  

dataFrame = spark.createDataFrame([

    (0, Vectors.dense([1.0, 0.1, -8.0]),),

    (1, Vectors.dense([2.0, 1.0, -4.0]),),

    (2, Vectors.dense([4.0, 10.0, 8.0]),)

], ["id", "features"])

  

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

  

# Compute summary statistics and generate MaxAbsScalerModel

scalerModel = scaler.fit(dataFrame)

  

# rescale each feature to range [-1, 1].

scaledData = scalerModel.transform(dataFrame)

  

scaledData.select("features", "scaledFeatures").show()

Output: Pasted image 20250320171623.png

Bucketizer

Chia các giá trị liên tục thành các nhóm (bucket) dựa trên các ngưỡng xác định.

from pyspark.sql import SparkSession

from pyspark.ml.feature import Bucketizer

  
  

spark = SparkSession.builder.appName("Bucketizer Example").getOrCreate()

  

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

  

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]

dataFrame = spark.createDataFrame(data, ["features"])

  

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

  

# Transform original data into its bucket index.

bucketedData = bucketizer.transform(dataFrame)

  

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))

bucketedData.show()

Output: Pasted image 20250320192236.png

ElementwiseProduct

Nhân từng phần tử của vector đặc trưng với một vector trọng số, giúp điều chỉnh tầm quan trọng của từng đặc trưng.

from pyspark.sql import SparkSession

from pyspark.ml.feature import ElementwiseProduct

from pyspark.ml.linalg import Vectors

  
  

spark = SparkSession.builder.appName("ElementwiseProduct Example").getOrCreate()

  
  

# Create some vector data; also works for sparse vectors

data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]

df = spark.createDataFrame(data, ["vector"])

transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),

                                 inputCol="vector", outputCol="transformedVector")

# Batch transform the vectors to create new column:

transformer.transform(df).show()

Output: Pasted image 20250320192422.png

SQLTransformer

Áp dụng các phép biến đổi SQL trên DataFrame, linh hoạt trong việc xử lý và biến đổi dữ liệu.

from pyspark.sql import SparkSession

from pyspark.ml.feature import SQLTransformer

  
  

spark = SparkSession.builder.appName("SQLTransformer Example").getOrCreate()

  

df = spark.createDataFrame([

    (0, 1.0, 3.0),

    (2, 2.0, 5.0)

], ["id", "v1", "v2"])

sqlTrans = SQLTransformer(

    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

sqlTrans.transform(df).show()

Output: Pasted image 20250320192543.png

VectorAssembler

Kết hợp nhiều cột đặc trưng thành một cột vector duy nhất, chuẩn bị dữ liệu cho mô hình học máy.

from pyspark.sql import SparkSession

from pyspark.ml.linalg import Vectors

from pyspark.ml.feature import VectorAssembler

  
  

spark = SparkSession.builder.appName("VectorAssembler Example").getOrCreate()

  

dataset = spark.createDataFrame(

    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],

    ["id", "hour", "mobile", "userFeatures", "clicked"])

  

assembler = VectorAssembler(

    inputCols=["hour", "mobile", "userFeatures"],

    outputCol="features")

  

output = assembler.transform(dataset)

print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")

output.select("features", "clicked").show(truncate=False)

Output: Pasted image 20250320192719.png

VectorSizeHint

Cung cấp thông tin về kích thước của vector đặc trưng, giúp tránh lỗi trong quá trình xử lý.

from pyspark.sql import SparkSession

from pyspark.ml.linalg import Vectors

from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)

  
  

spark = SparkSession.builder.appName("VectorSizeHint Example").getOrCreate()

  

dataset = spark.createDataFrame(

    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),

     (0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],

    ["id", "hour", "mobile", "userFeatures", "clicked"])

  

sizeHint = VectorSizeHint(

    inputCol="userFeatures",

    handleInvalid="skip",

    size=3)

  

datasetWithSize = sizeHint.transform(dataset)

print("Rows where 'userFeatures' is not the right size are filtered out")

datasetWithSize.show(truncate=False)

  

assembler = VectorAssembler(

    inputCols=["hour", "mobile", "userFeatures"],

    outputCol="features")

  

# This dataframe can be used by downstream transformers as before

output = assembler.transform(datasetWithSize)

print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")

output.select("features", "clicked").show(truncate=False)

Output: Pasted image 20250320192837.png

QuantileDiscretizer

Chia các giá trị liên tục thành các nhóm dựa trên phân vị, giúp xử lý các đặc trưng không phân phối đều.

from pyspark.sql import SparkSession

from pyspark.ml.feature import QuantileDiscretizer

  
  

spark = SparkSession.builder.appName("QuantileDiscretizer Example").getOrCreate()

  

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]

df = spark.createDataFrame(data, ["id", "hour"])

  

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

  

result = discretizer.fit(df).transform(df)

result.show()

Output: Pasted image 20250320193154.png

Imputer

Điền giá trị thiếu trong dữ liệu bằng các chiến lược như trung bình hoặc trung vị, đảm bảo dữ liệu đầy đủ cho mô hình.

from pyspark.sql import SparkSession

from pyspark.ml.feature import Imputer

  
  

spark = SparkSession.builder.appName("Imputer Example").getOrCreate()

  

df = spark.createDataFrame([

    (1.0, float("nan")),

    (2.0, float("nan")),

    (float("nan"), 3.0),

    (4.0, 4.0),

    (5.0, 5.0)

], ["a", "b"])

  

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])

model = imputer.fit(df)

  

model.transform(df).show()

Output: Pasted image 20250320193342.png

3. Lựa chọn đặc trưng (Feature Selectors)

VectorSlicer

Chọn một tập con các đặc trưng từ vector đặc trưng ban đầu dựa trên chỉ số, giúp giảm kích thước không gian đặc trưng.

from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorSlicer

from pyspark.ml.linalg import Vectors

from pyspark.sql.types import Row

  
  

spark = SparkSession.builder.appName("VectorSlicer Example").getOrCreate()

  

df = spark.createDataFrame([

    Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),

    Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])

  

slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

  

output = slicer.transform(df)

  

output.select("userFeatures", "features").show()

Output: Pasted image 20250320193705.png

RFormula

Sử dụng cú pháp R để chỉ định các đặc trưng và nhãn, đơn giản hóa việc xây dựng mô hình.

from pyspark.sql import SparkSession

from pyspark.ml.feature import RFormula


spark = SparkSession.builder.appName("VectorSlicer Example").getOrCreate()

  

dataset = spark.createDataFrame(

    [(7, "US", 18, 1.0),

     (8, "CA", 12, 0.0),

     (9, "NZ", 15, 0.0)],

    ["id", "country", "hour", "clicked"])

  

formula = RFormula(

    formula="clicked ~ country + hour",

    featuresCol="features",

    labelCol="label")

  

output = formula.fit(dataset).transform(dataset)

output.select("features", "label").show()

Output: Pasted image 20250320193904.png

ChiSqSelector

Sử dụng kiểm định Chi-squared để chọn các đặc trưng quan trọng nhất, hữu ích trong các bài toán phân loại.

from pyspark.sql import SparkSession

from pyspark.ml.feature import ChiSqSelector

from pyspark.ml.linalg import Vectors

  
  

spark = SparkSession.builder.appName("ChiSqSelector Example").getOrCreate()

  

df = spark.createDataFrame([

    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),

    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),

    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])

  

selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",

                         outputCol="selectedFeatures", labelCol="clicked")

  

result = selector.fit(df).transform(df)

  

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())

result.show()

Output: Pasted image 20250320194035.png

UnivariateFeatureSelector

Chọn các đặc trưng dựa trên các thống kê đơn biến, giúp loại bỏ các đặc trưng không liên quan.

from pyspark.sql import SparkSession

from pyspark.ml.feature import UnivariateFeatureSelector

from pyspark.ml.linalg import Vectors

  

spark = SparkSession.builder.appName("UnivariateFeatureSelector Example").getOrCreate()

  

df = spark.createDataFrame([

    (1, Vectors.dense([1.7, 4.4, 7.6, 5.8, 9.6, 2.3]), 3.0,),

    (2, Vectors.dense([8.8, 7.3, 5.7, 7.3, 2.2, 4.1]), 2.0,),

    (3, Vectors.dense([1.2, 9.5, 2.5, 3.1, 8.7, 2.5]), 3.0,),

    (4, Vectors.dense([3.7, 9.2, 6.1, 4.1, 7.5, 3.8]), 2.0,),

    (5, Vectors.dense([8.9, 5.2, 7.8, 8.3, 5.2, 3.0]), 4.0,),

    (6, Vectors.dense([7.9, 8.5, 9.2, 4.0, 9.4, 2.1]), 4.0,)], ["id", "features", "label"])

  

selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",

                                     labelCol="label", selectionMode="numTopFeatures")

selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(1)

  

result = selector.fit(df).transform(df)

  

print("UnivariateFeatureSelector output with top %d features selected using f_classif"

      % selector.getSelectionThreshold())

result.show()

Output: Pasted image 20250320194226.png

VarianceThresholdSelector

Loại bỏ các đặc trưng có phương sai thấp, thường không mang nhiều thông tin cho mô hình.

from pyspark.sql import SparkSession

from pyspark.ml.feature import VarianceThresholdSelector

from pyspark.ml.linalg import Vectors

  

spark = SparkSession.builder.appName("VarianceThresholdSelector Example").getOrCreate()

  

df = spark.createDataFrame([

    (1, Vectors.dense([6.0, 7.0, 0.0, 7.0, 6.0, 0.0])),

    (2, Vectors.dense([0.0, 9.0, 6.0, 0.0, 5.0, 9.0])),

    (3, Vectors.dense([0.0, 9.0, 3.0, 0.0, 5.0, 5.0])),

    (4, Vectors.dense([0.0, 9.0, 8.0, 5.0, 6.0, 4.0])),

    (5, Vectors.dense([8.0, 9.0, 6.0, 5.0, 4.0, 4.0])),

    (6, Vectors.dense([8.0, 9.0, 6.0, 0.0, 0.0, 0.0]))], ["id", "features"])

  

selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")

  

result = selector.fit(df).transform(df)

  

print("Output: Features with variance lower than %f are removed." %

      selector.getVarianceThreshold())

result.show()

Output: Pasted image 20250320194406.png

4. Băm nhạy cảm cục bộ (Locality Sensitive Hashing - LSH)

Feature Transformation

  • Chuyển đổi các đặc trưng thành các mã băm (hash codes) để hỗ trợ tìm kiếm gần đúng (approximate nearest neighbor search).
  • Mục tiêu là giữ các điểm gần nhau trong không gian đầu vào tiếp tục gần nhau trong không gian mã băm.

Approximate Similarity Join

  • Tìm các cặp điểm trong hai tập dữ liệu sao cho khoảng cách giữa chúng nhỏ hơn một ngưỡng xác định trước.
  • Hữu ích trong việc tìm kiếm các điểm dữ liệu tương tự mà không cần phải tính toán toàn bộ khoảng cách giữa tất cả các điểm.
  • Tìm điểm dữ liệu gần nhất trong tập dữ liệu dựa trên mã băm được tạo.
  • Hiệu quả hơn so với tìm kiếm trực tiếp trên toàn bộ không gian dữ liệu.

5. Các phương pháp LSH trong Spark MLlib

Bucketed Random Projection for Euclidean Distance

  • Dùng để tìm kiếm gần đúng dựa trên khoảng cách Euclidean.
  • Phân vùng dữ liệu thành các "bucket" bằng cách sử dụng phép chiếu ngẫu nhiên.
  • Giảm số lượng phép tính khoảng cách khi tìm điểm gần nhất.

The Euclidean distance is defined:

Hash bucket:

Code python:

from pyspark.sql import SparkSession

from pyspark.ml.feature import BucketedRandomProjectionLSH

from pyspark.ml.linalg import Vectors

from pyspark.sql.functions import col

  

spark = SparkSession.builder.appName("VarianceThresholdSelector Example").getOrCreate()

  

dataA = [(0, Vectors.dense([1.0, 1.0]),),

         (1, Vectors.dense([1.0, -1.0]),),

         (2, Vectors.dense([-1.0, -1.0]),),

         (3, Vectors.dense([-1.0, 1.0]),)]

dfA = spark.createDataFrame(dataA, ["id", "features"])

  

dataB = [(4, Vectors.dense([1.0, 0.0]),),

         (5, Vectors.dense([-1.0, 0.0]),),

         (6, Vectors.dense([0.0, 1.0]),),

         (7, Vectors.dense([0.0, -1.0]),)]

dfB = spark.createDataFrame(dataB, ["id", "features"])

  

key = Vectors.dense([1.0, 0.0])

  

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,

                                  numHashTables=3)

model = brp.fit(dfA)

  

# Feature Transformation

print("The hashed dataset where hashed values are stored in the column 'hashes':")

model.transform(dfA).show()

  

# Compute the locality sensitive hashes for the input rows, then perform approximate

# similarity join.

# We could avoid computing hashes by passing in the already-transformed dataset, e.g.

# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`

print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")

model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\

    .select(col("datasetA.id").alias("idA"),

            col("datasetB.id").alias("idB"),

            col("EuclideanDistance")).show()

  

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest

# neighbor search.

# We could avoid computing hashes by passing in the already-transformed dataset, e.g.

# `model.approxNearestNeighbors(transformedA, key, 2)`

print("Approximately searching dfA for 2 nearest neighbors of the key:")

model.approxNearestNeighbors(dfA, key, 2).show()

Output: Pasted image 20250320195308.png

MinHash for Jaccard Distance

  • Được sử dụng để tính toán khoảng cách Jaccard giữa các tập hợp.
  • MinHash tạo ra các chữ ký (signatures) nhỏ gọn cho từng tập hợp, giúp so sánh nhanh chóng.
  • Phù hợp với dữ liệu dạng tập hợp (như tập hợp từ trong văn bản).
    Khoảng cách Jaccard của hai tập hợp được xác định bởi số lượng phần tử của giao và hợp của chúng

MinHash:

Code python

from pyspark.sql import SparkSession

from pyspark.ml.feature import MinHashLSH

from pyspark.ml.linalg import Vectors

from pyspark.sql.functions import col

  

spark = SparkSession.builder.appName("VarianceThresholdSelector Example").getOrCreate()

  

dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),

         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),

         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]

dfA = spark.createDataFrame(dataA, ["id", "features"])

  

dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),

         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),

         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]

dfB = spark.createDataFrame(dataB, ["id", "features"])

  

key = Vectors.sparse(6, [1, 3], [1.0, 1.0])

  

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)

model = mh.fit(dfA)

  

# Feature Transformation

print("The hashed dataset where hashed values are stored in the column 'hashes':")

model.transform(dfA).show()

  

# Compute the locality sensitive hashes for the input rows, then perform approximate

# similarity join.

# We could avoid computing hashes by passing in the already-transformed dataset, e.g.

# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`

print("Approximately joining dfA and dfB on distance smaller than 0.6:")

model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\

    .select(col("datasetA.id").alias("idA"),

            col("datasetB.id").alias("idB"),

            col("JaccardDistance")).show()

  

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest

# neighbor search.

# We could avoid computing hashes by passing in the already-transformed dataset, e.g.

# `model.approxNearestNeighbors(transformedA, key, 2)`

# It may return less than 2 rows when not enough approximate near-neighbor candidates are

# found.

print("Approximately searching dfA for 2 nearest neighbors of the key:")

model.approxNearestNeighbors(dfA, key, 2).show()

Output: Pasted image 20250320195431.png


V. Phân loại và Hồi quy (Classification and regression)

Phân loại và hồi quy là hai nhiệm vụ chính trong học máy:

  • Phân loại: Dự đoán nhãn phân loại cho các quan sát, chẳng hạn như phân loại email là "spam" hoặc "không spam".
  • Hồi quy: Dự đoán giá trị số liên tục, chẳng hạn như dự đoán giá nhà dựa trên các đặc trưng như diện tích, vị trí, v.v.

1. Phân loại

Hồi quy Logistic (Logistic Regression)

Dùng để dự đoán xác suất của một nhãn phân loại. Spark hỗ trợ cả hồi quy logistic nhị phân và đa thức.

Binomial logistic regression

from pyspark.sql import SparkSession

from pyspark.ml.classification import LogisticRegression

  

spark = SparkSession.builder.appName("logistic Example").getOrCreate()

  

# Load training data

training = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

  
  

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

  

# Fit the model

lrModel = lr.fit(training)

  

# Print the coefficients and intercept for logistic regression

print("Coefficients: " + str(lrModel.coefficients))

print("Intercept: " + str(lrModel.intercept))

  

# We can also use the multinomial family for binary classification

mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

  

# Fit the model

mlrModel = mlr.fit(training)

  

# Print the coefficients and intercepts for logistic regression with multinomial family

print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))

print("Multinomial intercepts: " + str(mlrModel.interceptVector))

Output: Pasted image 20250320200635.png

  • Cải tiến thêm
from pyspark.sql import SparkSession

from pyspark.ml.classification import LogisticRegression

  

spark = SparkSession.builder.appName("logistic Example").getOrCreate()

  

# Load training data

training = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

  
  

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

  

# Fit the model

lrModel = lr.fit(training)

  

# Print the coefficients and intercept for logistic regression

print("Coefficients: " + str(lrModel.coefficients))

print("Intercept: " + str(lrModel.intercept))

  

# We can also use the multinomial family for binary classification

mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

  

# Fit the model

mlrModel = mlr.fit(training)

  

# Extract the summary from the returned LogisticRegressionModel instance trained

# in the earlier example

trainingSummary = lrModel.summary

  

# Obtain the objective per iteration

objectiveHistory = trainingSummary.objectiveHistory

print("objectiveHistory:")

for objective in objectiveHistory:

    print(objective)

  

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.

trainingSummary.roc.show()

print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

  

# Set the model threshold to maximize F-Measure

fMeasure = trainingSummary.fMeasureByThreshold

maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()

bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \

    .select('threshold').head()['threshold']

lr.setThreshold(bestThreshold)

Output: Pasted image 20250320201048.png

Multinomial logistic regression

The conditional probabilities of the outcome classes are modeled using the softmax function.

We minimize the weighted negative log-likelihood, using a multinomial response model, with elastic-net penalty to control for overfitting

from pyspark.sql import SparkSession

from pyspark.ml.classification import LogisticRegression

  

spark = SparkSession.builder.appName("logistic Example").getOrCreate()

  

# Load training data

training = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

  
  

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

  

# Fit the model

lrModel = lr.fit(training)

  

# Print the coefficients and intercept for multinomial logistic regression

print("Coefficients: \n" + str(lrModel.coefficientMatrix))

print("Intercept: " + str(lrModel.interceptVector))

  

trainingSummary = lrModel.summary

  

# Obtain the objective per iteration

objectiveHistory = trainingSummary.objectiveHistory

print("objectiveHistory:")

for objective in objectiveHistory:

    print(objective)

  

# for multiclass, we can inspect metrics on a per-label basis

print("False positive rate by label:")

for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):

    print("label %d: %s" % (i, rate))

  

print("True positive rate by label:")

for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):

    print("label %d: %s" % (i, rate))

  

print("Precision by label:")

for i, prec in enumerate(trainingSummary.precisionByLabel):

    print("label %d: %s" % (i, prec))

  

print("Recall by label:")

for i, rec in enumerate(trainingSummary.recallByLabel):

    print("label %d: %s" % (i, rec))

  

print("F-measure by label:")

for i, f in enumerate(trainingSummary.fMeasureByLabel()):

    print("label %d: %s" % (i, f))

  

accuracy = trainingSummary.accuracy

falsePositiveRate = trainingSummary.weightedFalsePositiveRate

truePositiveRate = trainingSummary.weightedTruePositiveRate

fMeasure = trainingSummary.weightedFMeasure()

precision = trainingSummary.weightedPrecision

recall = trainingSummary.weightedRecall

print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"

      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

Output: Pasted image 20250320201630.png

Cây quyết định (Decision Tree)

Xây dựng mô hình phân loại dựa trên việc chia dữ liệu thành các nhóm con dựa trên các đặc trưng.

from pyspark.sql import SparkSession

from pyspark.ml import Pipeline

from pyspark.ml.classification import DecisionTreeClassifier

from pyspark.ml.feature import StringIndexer, VectorIndexer

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

  

spark = SparkSession.builder.appName("Class Example").getOrCreate()

  

# Load the data stored in LIBSVM format as a DataFrame.

data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

  

# Index labels, adding metadata to the label column.

# Fit on whole dataset to include all labels in index.

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.

# We specify maxCategories so features with > 4 distinct values are treated as continuous.

featureIndexer =\

    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

  

# Split the data into training and test sets (30% held out for testing)

(trainingData, testData) = data.randomSplit([0.7, 0.3])

  

# Train a DecisionTree model.

dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

  

# Chain indexers and tree in a Pipeline

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

  

# Train model.  This also runs the indexers.

model = pipeline.fit(trainingData)

  

# Make predictions.

predictions = model.transform(testData)

  

# Select example rows to display.

predictions.select("prediction", "indexedLabel", "features").show(5)

  

# Select (prediction, true label) and compute test error

evaluator = MulticlassClassificationEvaluator(

    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

print("Test Error = %g " % (1.0 - accuracy))

  

treeModel = model.stages[2]

# summary only

print(treeModel)

Output: Pasted image 20250320215625.png

Rừng ngẫu nhiên (Random Forest)

Sử dụng nhiều cây quyết định để cải thiện độ chính xác và giảm thiểu overfitting.

from pyspark.sql import SparkSession

from pyspark.ml import Pipeline

from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

  

spark = SparkSession.builder.appName("Class Example").getOrCreate()

  

# Load the data stored in LIBSVM format as a DataFrame.

data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

  

# Index labels, adding metadata to the label column.

# Fit on whole dataset to include all labels in index.

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

  

# Automatically identify categorical features, and index them.

# Set maxCategories so features with > 4 distinct values are treated as continuous.

featureIndexer =\

    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

  

# Split the data into training and test sets (30% held out for testing)

(trainingData, testData) = data.randomSplit([0.7, 0.3])

  

# Train a RandomForest model.

rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

  

# Convert indexed labels back to original labels.

labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",

                               labels=labelIndexer.labels)

  

# Chain indexers and forest in a Pipeline

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

  

# Train model.  This also runs the indexers.

model = pipeline.fit(trainingData)

  

# Make predictions.

predictions = model.transform(testData)

  

# Select example rows to display.

predictions.select("predictedLabel", "label", "features").show(5)

  

# Select (prediction, true label) and compute test error

evaluator = MulticlassClassificationEvaluator(

    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

print("Test Error = %g" % (1.0 - accuracy))

  

rfModel = model.stages[2]

print(rfModel)  # summary only

Output: Pasted image 20250320215954.png

Cây tăng cường độ dốc (Gradient-Boosted Tree)

Xây dựng mô hình bằng cách kết hợp nhiều cây quyết định được huấn luyện tuần tự, mỗi cây cố gắng sửa lỗi của cây trước đó.

from pyspark.sql import SparkSession

from pyspark.ml import Pipeline

from pyspark.ml.classification import GBTClassifier

from pyspark.ml.feature import StringIndexer, VectorIndexer

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

  

spark = SparkSession.builder.appName("Train-Validation split Example").getOrCreate()

  

# Load the data stored in LIBSVM format as a DataFrame.

data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

  
  

# Index labels, adding metadata to the label column.

# Fit on whole dataset to include all labels in index.

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.

# Set maxCategories so features with > 4 distinct values are treated as continuous.

featureIndexer =\

    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

  

# Split the data into training and test sets (30% held out for testing)

(trainingData, testData) = data.randomSplit([0.7, 0.3])

  

# Train a GBT model.

gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

  

# Chain indexers and GBT in a Pipeline

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

  

# Train model.  This also runs the indexers.

model = pipeline.fit(trainingData)

  

# Make predictions.

predictions = model.transform(testData)

  

# Select example rows to display.

predictions.select("prediction", "indexedLabel", "features").show(5)

  

# Select (prediction, true label) and compute test error

evaluator = MulticlassClassificationEvaluator(

    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

print("Test Error = %g" % (1.0 - accuracy))

  

gbtModel = model.stages[2]

print(gbtModel)  # summary only

Output: Pasted image 20250320220319.png

Máy vector hỗ trợ tuyến tính (Linear Support Vector Machine)

Tìm siêu phẳng phân tách các lớp trong không gian đặc trưng.

from pyspark.sql import SparkSession

from pyspark.ml.classification import LinearSVC

  

spark = SparkSession.builder.appName("Train-Validation split Example").getOrCreate()

  

# Load the data stored in LIBSVM format as a DataFrame.

training = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

  
  

lsvc = LinearSVC(maxIter=10, regParam=0.1)

  

# Fit the model

lsvcModel = lsvc.fit(training)

  

# Print the coefficients and intercept for linear SVC

print("Coefficients: " + str(lsvcModel.coefficients))

print("Intercept: " + str(lsvcModel.intercept))

Output: Pasted image 20250320221033.png
Pasted image 20250320221048.png

Phân loại Naive Bayes

Dựa trên định lý Bayes và giả định độc lập giữa các đặc trưng.

from pyspark.sql import SparkSession

from pyspark.ml.classification import NaiveBayes

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

  

spark = SparkSession.builder.appName("Class Example").getOrCreate()

  

# Load the data stored in LIBSVM format as a DataFrame.

data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

  

# Split the data into train and test

splits = data.randomSplit([0.6, 0.4], 1234)

train = splits[0]

test = splits[1]

  

# create the trainer and set its parameters

nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

  

# train the model

model = nb.fit(train)

  

# select example rows to display.

predictions = model.transform(test)

predictions.show()

  

# compute accuracy on the test set

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",

                                              metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

print("Test set accuracy = " + str(accuracy))

Output: Pasted image 20250320221357.png

Máy phân loại Perceptron nhiều lớp (Multilayer Perceptron Classifier)

Mạng nơ-ron nhiều lớp cho các nhiệm vụ phân loại.

Sigmoid:

Softmax func:

from pyspark.sql import SparkSession
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


  

spark = SparkSession.builder.appName("Class Example").getOrCreate()

  

# Load the data stored in LIBSVM format as a DataFrame.

data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_multiclass_classification_data.txt")

  
  

# Split the data into train and test

splits = data.randomSplit([0.6, 0.4], 1234)

train = splits[0]

test = splits[1]

  

# specify layers for the neural network:

# input layer of size 4 (features), two intermediate of size 5 and 4

# and output of size 3 (classes)

layers = [4, 5, 4, 3]

  

# create the trainer and set its parameters

trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

  

# train the model

model = trainer.fit(train)

  

# compute accuracy on the test set

result = model.transform(test)

predictionAndLabels = result.select("prediction", "label")

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Output: Pasted image 20250320221723.png

Máy phân loại One-vs-Rest (One-vs-All)

Chiến lược phân loại đa lớp bằng cách huấn luyện một mô hình nhị phân cho mỗi lớp.

from pyspark.sql import SparkSession

from pyspark.ml.classification import LogisticRegression, OneVsRest

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

  

spark = SparkSession.builder.appName("Class Example").getOrCreate()

  

# Load the data stored in LIBSVM format as a DataFrame.

inputData = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_multiclass_classification_data.txt")

  

# generate the train/test split.

(train, test) = inputData.randomSplit([0.8, 0.2])

  

# instantiate the base classifier.

lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

  

# instantiate the One Vs Rest Classifier.

ovr = OneVsRest(classifier=lr)

  

# train the multiclass model.

ovrModel = ovr.fit(train)

  

# score the model on test data.

predictions = ovrModel.transform(test)

  

# obtain evaluator.

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

  

# compute the classification error on test data.

accuracy = evaluator.evaluate(predictions)

print("Test Error = %g" % (1.0 - accuracy))

Output: Pasted image 20250320221955.png

Máy phân loại Factorization Machines

Kết hợp giữa hồi quy tuyến tính và mô hình tương tác bậc hai để xử lý dữ liệu thưa và có tính tương tác cao.

from pyspark.sql import SparkSession

from pyspark.ml import Pipeline

from pyspark.ml.classification import FMClassifier

from pyspark.ml.feature import MinMaxScaler, StringIndexer

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

  

spark = SparkSession.builder.appName("Class Example").getOrCreate()

  

# Load the data stored in LIBSVM format as a DataFrame.

data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")

  

# Index labels, adding metadata to the label column.

# Fit on whole dataset to include all labels in index.

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Scale features.

featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)

  

# Split the data into training and test sets (30% held out for testing)

(trainingData, testData) = data.randomSplit([0.7, 0.3])

  

# Train a FM model.

fm = FMClassifier(labelCol="indexedLabel", featuresCol="scaledFeatures", stepSize=0.001)

  

# Create a Pipeline.

pipeline = Pipeline(stages=[labelIndexer, featureScaler, fm])

  

# Train model.

model = pipeline.fit(trainingData)

  

# Make predictions.

predictions = model.transform(testData)

  

# Select example rows to display.

predictions.select("prediction", "indexedLabel", "features").show(5)

  

# Select (prediction, true label) and compute test accuracy

evaluator = MulticlassClassificationEvaluator(

    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

print("Test set accuracy = %g" % accuracy)

  

fmModel = model.stages[2]

print("Factors: " + str(fmModel.factors))  # type: ignore

print("Linear: " + str(fmModel.linear))  # type: ignore

print("Intercept: " + str(fmModel.intercept))  # type: ignore

Output: Pasted image 20250320222217.png
Pasted image 20250320222241.png

2. Hồi quy

Hồi quy tuyến tính (Linear Regression)

Mô hình hóa mối quan hệ giữa biến phụ thuộc và một hoặc nhiều biến độc lập bằng đường thẳng.

Hồi quy tuyến tính tổng quát (Generalized Linear Regression)

Mở rộng hồi quy tuyến tính để hỗ trợ các phân phối khác nhau của biến phụ thuộc.

Hồi quy cây quyết định (Decision Tree Regression)

Sử dụng cây quyết định để dự đoán giá trị liên tục.

Hồi quy rừng ngẫu nhiên (Random Forest Regression)

Sử dụng nhiều cây quyết định để dự đoán giá trị liên tục, cải thiện độ chính xác và giảm thiểu overfitting.

Hồi quy cây tăng cường độ dốc (Gradient-Boosted Tree Regression)

Kết hợp nhiều cây quyết định được huấn luyện tuần tự để dự đoán giá trị liên tục.

Hồi quy sống sót (Survival Regression)

Mô hình hóa thời gian cho đến khi xảy ra một sự kiện quan tâm, thường được sử dụng trong phân tích sống sót.

Hồi quy đẳng hướng (Isotonic Regression)

Mô hình hồi quy đơn điệu, đảm bảo rằng dự đoán không giảm hoặc không tăng theo thứ tự của biến độc lập.

Máy hồi quy Factorization Machines

Kết hợp giữa hồi quy tuyến tính và mô hình tương tác bậc hai để xử lý dữ liệu thưa và có tính tương tác cao.

3. Phương pháp tuyến tính

Phương pháp tuyến tính là nền tảng của nhiều thuật toán phân loại và hồi quy, bao gồm hồi quy logistic và hồi quy tuyến tính. Chúng giả định mối quan hệ tuyến tính giữa các đặc trưng và biến mục tiêu.

4. Máy phân loại và hồi quy Factorization Machines

5. Cây quyết định Decision trees

6. Tập hợp cây (Tree Ensembles)

7. Đầu vào và Đầu ra của các mô hình cây và tập hợp cây

  • Cột đầu vào: Các đặc trưng được sử dụng để huấn luyện mô hình.
  • Cột đầu ra: Dự đoán hoặc xác suất dự đoán được tạo ra bởi mô hình.

VI. Thuật toán phân cụm (Clustering)

K-means

K-means là một trong những thuật toán phân cụm phổ biến nhất, nhóm các điểm dữ liệu thành số cụm được xác định trước. MLlib triển khai một biến thể song song của phương pháp k-means++ gọi là kmeans.

Các cột đầu vào:

  • featuresCol: Vector, mặc định là "features". Đây là vector đặc trưng của dữ liệu đầu vào.

Các cột đầu ra:

  • predictionCol: Int, mặc định là "prediction". Đây là cột chứa thông tin về cụm dự đoán cho mỗi điểm dữ liệu.
from pyspark.sql import SparkSession

from pyspark.ml.clustering import KMeans

from pyspark.ml.evaluation import ClusteringEvaluator

  

spark = SparkSession.builder.appName("logistic Example").getOrCreate()

  

# Loads data.

dataset = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_kmeans_data.txt")

  

# Trains a k-means model.

kmeans = KMeans().setK(2).setSeed(1)

model = kmeans.fit(dataset)

  

# Make predictions

predictions = model.transform(dataset)

  

# Evaluate clustering by computing Silhouette score

evaluator = ClusteringEvaluator()

  

silhouette = evaluator.evaluate(predictions)

print("Silhouette with squared euclidean distance = " + str(silhouette))

  

# Shows the result.

centers = model.clusterCenters()

print("Cluster Centers: ")

for center in centers:

    print(center)

Output: Pasted image 20250320204428.png

Phân bổ Dirichlet ẩn (Latent Dirichlet Allocation - LDA)

LDA là một thuật toán phân cụm chủ yếu được sử dụng cho mô hình chủ đề trong xử lý ngôn ngữ tự nhiên. Nó nhóm các tài liệu thành các chủ đề dựa trên phân phối từ ngữ.

from pyspark.sql import SparkSession

from pyspark.ml.clustering import LDA

  

spark = SparkSession.builder.appName("logistic Example").getOrCreate()

  

# Loads data.

dataset = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_lda_libsvm_data.txt")

  

# Trains a LDA model.

lda = LDA(k=10, maxIter=10)

model = lda.fit(dataset)

  

ll = model.logLikelihood(dataset)

lp = model.logPerplexity(dataset)

print("The lower bound on the log likelihood of the entire corpus: " + str(ll))

print("The upper bound on perplexity: " + str(lp))

  

# Describe topics.

topics = model.describeTopics(3)

print("The topics described by their top-weighted terms:")

topics.show(truncate=False)

  

# Shows the result

transformed = model.transform(dataset)

transformed.show(truncate=False)

Output: Pasted image 20250320204715.png

Bisecting k-means

Bisecting k-means là một biến thể của k-means, hoạt động bằng cách chia đôi các cụm lặp đi lặp lại cho đến khi đạt được số lượng cụm mong muốn. Phương pháp này thường dẫn đến các cụm có chất lượng cao hơn và ổn định hơn so với k-means truyền thống.

from pyspark.sql import SparkSession

from pyspark.ml.clustering import BisectingKMeans

from pyspark.ml.evaluation import ClusteringEvaluator

  

spark = SparkSession.builder.appName("logistic Example").getOrCreate()

  

# Loads data.

dataset = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_kmeans_data.txt")

  
  

# Trains a bisecting k-means model.

bkm = BisectingKMeans().setK(2).setSeed(1)

model = bkm.fit(dataset)

  

# Make predictions

predictions = model.transform(dataset)

  

# Evaluate clustering by computing Silhouette score

evaluator = ClusteringEvaluator()

  

silhouette = evaluator.evaluate(predictions)

print("Silhouette with squared euclidean distance = " + str(silhouette))

  

# Shows the result.

print("Cluster Centers: ")

centers = model.clusterCenters()

for center in centers:

    print(center)

Ouput: Pasted image 20250320204938.png

Mô hình Hỗn hợp Gaussian (Gaussian Mixture Model - GMM)

GMM giả định rằng dữ liệu được tạo ra từ sự kết hợp của nhiều phân phối Gaussian. Thuật toán này ước lượng các tham số của các phân phối Gaussian thành phần để mô hình hóa dữ liệu.

Các cột đầu vào:

  • featuresCol: Vector, mặc định là "features".

Các cột đầu ra:

  • predictionCol: Int, mặc định là "prediction".
from pyspark.sql import SparkSession

from pyspark.ml.clustering import GaussianMixture

  

spark = SparkSession.builder.appName("logistic Example").getOrCreate()

  

# Loads data.

dataset = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_kmeans_data.txt")

  

gmm = GaussianMixture().setK(2).setSeed(538009335)

model = gmm.fit(dataset)

  

print("Gaussians shown as a DataFrame: ")

model.gaussiansDF.show(truncate=False)

Output: Pasted image 20250320205256.png

Phân cụm Lặp lại theo Lũy thừa (Power Iteration Clustering - PIC)

Power Iteration Clustering (PIC) là một thuật toán phân cụm trên đồ thị có khả năng mở rộng, được phát triển bởi Lin và Cohen. Thuật toán này tìm một ánh xạ không gian có số chiều rất thấp cho dữ liệu bằng cách sử dụng phép lặp lũy thừa bị cắt trên ma trận tương đồng chuẩn hóa của dữ liệu.

Các tham số trong PowerIterationClustering của spark.ml

  • k: Số lượng cụm cần tạo.
  • initMode: Phương pháp khởi tạo thuật toán.
  • maxIter: Số lần lặp tối đa.
  • srcCol: Tên cột chứa ID đỉnh nguồn trong đồ thị đầu vào.
  • dstCol: Tên cột chứa ID đỉnh đích trong đồ thị đầu vào.
  • weightCol: Tên cột chứa trọng số cạnh giữa các đỉnh.
from pyspark.sql import SparkSession

from pyspark.ml.clustering import PowerIterationClustering

  

spark = SparkSession.builder.appName("logistic Example").getOrCreate()

  

df = spark.createDataFrame([

    (0, 1, 1.0),

    (0, 2, 1.0),

    (1, 2, 1.0),

    (3, 4, 1.0),

    (4, 0, 0.1)

], ["src", "dst", "weight"])

  

pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", weightCol="weight")

  

# Shows the cluster assignment

pic.assignClusters(df).show()

Output: Pasted image 20250320205627.png


VII. Collaborative Filtering (Lọc cộng tác)

Lọc cộng tác (Collaborative Filtering) là một kỹ thuật phổ biến trong hệ thống gợi ý, nhằm dự đoán sở thích của người dùng dựa trên hành vi của họ và những người dùng tương tự. Trong Spark MLlib, lọc cộng tác được thực hiện thông qua phương pháp phân rã ma trận dựa trên mô hình, trong đó người dùng và sản phẩm được biểu diễn bằng một tập hợp nhỏ các yếu tố tiềm ẩn, giúp dự đoán các mục còn thiếu trong ma trận người dùng-sản phẩm. Thuật toán được sử dụng để học các yếu tố tiềm ẩn này là Alternating Least Squares (ALS).

Các tham số chính trong ALS:

  • numBlocks: Số lượng khối mà người dùng và sản phẩm sẽ được chia để tính toán song song (mặc định là 10).
  • rank: Số lượng yếu tố tiềm ẩn trong mô hình (mặc định là 10).
  • maxIter: Số lần lặp tối đa (mặc định là 10).
  • regParam: Tham số điều chuẩn trong ALS (mặc định là 1.0).
  • implicitPrefs: Xác định việc sử dụng biến thể ALS cho phản hồi tường minh hay phản hồi ngầm định (mặc định là false, tức là sử dụng phản hồi tường minh).
  • alpha: Tham số áp dụng cho biến thể ALS với phản hồi ngầm định, điều chỉnh mức độ tin cậy cơ bản trong quan sát sở thích (mặc định là 1.0).
  • nonnegative: Xác định có sử dụng ràng buộc không âm cho bình phương tối thiểu hay không (mặc định là false).

Phản hồi tường minh và phản hồi ngầm định:

  • Phản hồi tường minh: Đề cập đến các đánh giá trực tiếp của người dùng về sản phẩm, chẳng hạn như xếp hạng phim.
  • Phản hồi ngầm định: Đề cập đến các hành vi gián tiếp của người dùng, như lượt xem, lượt nhấp, mua hàng, lượt thích. Trong trường hợp này, dữ liệu được coi là biểu thị mức độ quan sát hành động của người dùng, và các con số này liên quan đến mức độ tin cậy trong việc quan sát sở thích của người dùng, thay vì đánh giá tường minh về sản phẩm.

Quy mô của tham số điều chuẩn:

Trong quá trình cập nhật các yếu tố người dùng và sản phẩm, tham số điều chuẩn regParam được chia tỷ lệ dựa trên số lượng đánh giá mà người dùng hoặc sản phẩm nhận được. Phương pháp này, được gọi là "ALS-WR", giúp regParam ít phụ thuộc hơn vào quy mô của tập dữ liệu, cho phép áp dụng tham số tốt nhất học được từ một tập con mẫu vào toàn bộ tập dữ liệu.

Chiến lược khởi động lạnh (Cold-start):

Trong hệ thống gợi ý, vấn đề khởi động lạnh xảy ra khi có người dùng hoặc sản phẩm mới mà không có đánh giá trước đó. Để giải quyết vấn đề này, mô hình ALS trong Spark cung cấp tham số coldStartStrategy. Khi được đặt là "drop", bất kỳ dự đoán nào chứa người dùng hoặc sản phẩm chưa thấy sẽ bị loại bỏ khỏi tập kết quả, đảm bảo không có giá trị dự đoán không xác định trong đánh giá mô hình.

Lưu ý: API dựa trên DataFrame cho ALS hiện chỉ hỗ trợ các giá trị số nguyên cho ID người dùng và sản phẩm. Các kiểu số khác có thể được sử dụng cho các cột ID này, nhưng giá trị ID phải nằm trong phạm vi của kiểu số nguyên.

Code ví dụ

from pyspark.sql import SparkSession

from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS

from pyspark.sql import Row

  

spark = SparkSession.builder.appName("logistic Example").getOrCreate()

  

# Loads data.

lines = spark.read.text("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_movielens_ratings.txt").rdd

  
  
  

parts = lines.map(lambda row: row.value.split("::"))

ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),

                                     rating=float(p[2]), timestamp=int(p[3])))

ratings = spark.createDataFrame(ratingsRDD)

(training, test) = ratings.randomSplit([0.8, 0.2])

  

# Build the recommendation model using ALS on the training data

# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics

als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",

          coldStartStrategy="drop")

model = als.fit(training)

  

# Evaluate the model by computing the RMSE on the test data

predictions = model.transform(test)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",

                                predictionCol="prediction")

rmse = evaluator.evaluate(predictions)

print("Root-mean-square error = " + str(rmse))

  

# Generate top 10 movie recommendations for each user

userRecs = model.recommendForAllUsers(10)

# Generate top 10 user recommendations for each movie

movieRecs = model.recommendForAllItems(10)

  

# Generate top 10 movie recommendations for a specified set of users

users = ratings.select(als.getUserCol()).distinct().limit(3)

userSubsetRecs = model.recommendForUserSubset(users, 10)

# Generate top 10 user recommendations for a specified set of movies

movies = ratings.select(als.getItemCol()).distinct().limit(3)

movieSubSetRecs = model.recommendForItemSubset(movies, 10)

output: Pasted image 20250320210616.png


VII. Khai thác Mẫu thường xuyên (Frequent Pattern Mining)

FP-Growth

Thuật toán FP-Growth, được mô tả trong bài báo của Han và cộng sự, là một phương pháp hiệu quả để tìm các tập mục thường xuyên mà không cần tạo ra các tập ứng viên, giúp giảm chi phí tính toán. Trong Spark, một phiên bản song song của FP-Growth, gọi là PFP (Parallel FP-Growth), đã được triển khai, giúp mở rộng khả năng xử lý trên các tập dữ liệu lớn.

Các tham số chính trong triển khai FP-Growth của spark.ml:

  • minSupport: Ngưỡng hỗ trợ tối thiểu để một tập mục được coi là thường xuyên. Ví dụ, nếu một mục xuất hiện trong 3 trên 5 giao dịch, nó có hỗ trợ là 3/5 = 0.6.
  • minConfidence: Ngưỡng độ tin cậy tối thiểu để tạo ra luật kết hợp. Độ tin cậy cho biết mức độ thường xuyên mà một luật kết hợp được tìm thấy là đúng. Ví dụ, nếu tập mục X xuất hiện 4 lần trong các giao dịch và X cùng Y xuất hiện cùng nhau 2 lần, độ tin cậy cho luật X => Y là 2/4 = 0.5. Tham số này không ảnh hưởng đến việc khai thác các tập mục thường xuyên, nhưng xác định độ tin cậy tối thiểu để tạo ra các luật kết hợp từ các tập mục thường xuyên.
  • numPartitions: Số lượng phân vùng được sử dụng để phân phối công việc. Mặc định, tham số này không được đặt và số lượng phân vùng của tập dữ liệu đầu vào sẽ được sử dụng.

FPGrowthModel cung cấp:

  • freqItemsets: Các tập mục thường xuyên dưới dạng DataFrame với các cột:
    • items: array: Tập mục cụ thể.
    • freq: long: Số lần tập mục này xuất hiện, dựa trên các tham số mô hình đã cấu hình.
  • associationRules: Các luật kết hợp được tạo ra với độ tin cậy trên minConfidence, dưới dạng DataFrame với các cột:
    • antecedent: array: Tập mục giả thuyết của luật kết hợp.
    • consequent: array: Tập mục kết luận của luật kết hợp, luôn chứa một phần tử duy nhất.
    • confidence: double: Độ tin cậy của luật kết hợp.
    • lift: double: Một thước đo cho biết mức độ mà giả thuyết dự đoán kết luận, được tính bằng công thức support(antecedent U consequent) / (support(antecedent) x support(consequent)).
    • support: double: Tỷ lệ xuất hiện của cả giả thuyết và kết luận trong tập dữ liệu.

Output: Pasted image 20250320211209.png
Pasted image 20250320211233.png
Pasted image 20250320211251.png

PrefixSpan

Thuật toán PrefixSpan (Prefix-projected Sequential pattern mining) được sử dụng để khai thác các mẫu tuần tự thường xuyên trong tập dữ liệu. Trong Spark, thuật toán này được triển khai để tìm các chuỗi con xuất hiện thường xuyên trong các chuỗi dài hơn, hữu ích trong các ứng dụng như phân tích hành vi người dùng hoặc phát hiện mẫu trong dữ liệu thời gian.

Apache Spark cung cấp một triển khai thuật toán PrefixSpan trong spark.ml để khai thác các mẫu tuần tự thường xuyên. Dưới đây là mô tả chi tiết về các tham số quan trọng của thuật toán này:

  1. minSupport:

    • Ngưỡng hỗ trợ tối thiểu để một mẫu tuần tự được coi là thường xuyên.
    • Ví dụ: Nếu một mẫu xuất hiện trong 30% của tất cả các chuỗi trong tập dữ liệu, thì nó có support = 0.3.
  2. maxPatternLength:

    • Độ dài tối đa của một mẫu tuần tự.
    • Nếu một mẫu vượt quá độ dài này, nó sẽ không được đưa vào kết quả.
  3. maxLocalProjDBSize:

    • Số lượng mục tối đa được phép trong một prefix-projected database trước khi thuật toán bắt đầu xử lý cục bộ.
    • Tham số này giúp kiểm soát mức độ phân tán công việc và cần được điều chỉnh tùy theo kích thước bộ nhớ của các executors.
  4. sequenceCol:

    • Tên của cột chứa các chuỗi trong tập dữ liệu (mặc định là "sequence").
    • Các hàng có giá trị null trong cột này sẽ bị bỏ qua.
from pyspark.sql import SparkSession

from pyspark.ml.fpm import PrefixSpan

from pyspark.sql import Row

  

spark = SparkSession.builder.appName("PrefixSpan Example").getOrCreate()

  

df = spark.createDataFrame([Row(sequence=[[1, 2], [3]]),

                     Row(sequence=[[1], [3, 2], [1, 2]]),

                     Row(sequence=[[1, 2], [5]]),

                     Row(sequence=[[6]])])

  

prefixSpan = PrefixSpan(minSupport=0.5, maxPatternLength=5,

                        maxLocalProjDBSize=32000000)

  

# Find frequent sequential patterns.

prefixSpan.findFrequentSequentialPatterns(df).show()

Output: Pasted image 20250320212034.png


VIII. Tối ưu hóa Máy học: Lựa chọn mô hình và điều chỉnh siêu tham số (ML Tuning: model selection and hyperparameter tuning)

Tối ưu hóa Máy học: Lựa chọn mô hình và điều chỉnh siêu tham số là một phần quan trọng trong quy trình phát triển mô hình, giúp tìm ra mô hình hoặc các tham số tốt nhất cho một nhiệm vụ cụ thể. Apache Spark cung cấp các công cụ như CrossValidatorTrainValidationSplit để hỗ trợ quá trình này.

1. Lựa chọn mô hình (hay còn gọi là điều chỉnh siêu tham số)

Trong máy học, lựa chọn mô hình liên quan đến việc sử dụng dữ liệu để tìm ra mô hình hoặc các tham số tối ưu cho một nhiệm vụ cụ thể. Quá trình này có thể được thực hiện cho từng Estimator riêng lẻ như LogisticRegression, hoặc cho toàn bộ Pipeline bao gồm nhiều thuật toán, bước tiền xử lý và các bước khác. Spark cho phép người dùng điều chỉnh toàn bộ Pipeline cùng một lúc, thay vì điều chỉnh từng thành phần riêng lẻ.

Các công cụ như CrossValidatorTrainValidationSplit yêu cầu các thành phần sau:

  • Estimator: Thuật toán hoặc Pipeline cần được điều chỉnh.
  • ParamMap: Tập hợp các tham số để lựa chọn, thường được gọi là "lưới tham số" để tìm kiếm.
  • Evaluator: Chỉ số để đo lường hiệu suất của mô hình trên dữ liệu kiểm tra.

Quy trình hoạt động chung của các công cụ lựa chọn mô hình như sau:

  • Chia dữ liệu đầu vào thành các tập huấn luyện và kiểm tra riêng biệt.
  • Đối với mỗi cặp (huấn luyện, kiểm tra), lặp qua tập hợp các ParamMap:
    • Với mỗi ParamMap, huấn luyện Estimator với các tham số đó, nhận được mô hình đã huấn luyện và đánh giá hiệu suất của mô hình bằng Evaluator.
  • Chọn mô hình được tạo ra bởi tập tham số có hiệu suất tốt nhất.

Evaluator có thể là RegressionEvaluator cho các bài toán hồi quy, BinaryClassificationEvaluator cho dữ liệu nhị phân, MulticlassClassificationEvaluator cho các bài toán phân loại đa lớp, MultilabelClassificationEvaluator cho phân loại đa nhãn, hoặc RankingEvaluator cho các bài toán xếp hạng. Chỉ số mặc định được sử dụng để chọn ParamMap tốt nhất có thể được ghi đè bằng phương thức setMetricName trong mỗi Evaluator này.

Để xây dựng lưới tham số, người dùng có thể sử dụng tiện ích ParamGridBuilder. Mặc định, các tập hợp tham số từ lưới tham số được đánh giá tuần tự. Việc đánh giá tham số có thể được thực hiện song song bằng cách đặt parallelism với giá trị từ 2 trở lên (giá trị 1 sẽ là tuần tự) trước khi chạy lựa chọn mô hình với CrossValidator hoặc TrainValidationSplit. Giá trị của parallelism nên được chọn cẩn thận để tối đa hóa khả năng song song mà không vượt quá tài nguyên của cụm, và các giá trị lớn hơn không phải lúc nào cũng dẫn đến hiệu suất được cải thiện. Thông thường, giá trị lên đến 10 là đủ cho hầu hết các cụm.

2. Cross-Validation (Xác thực chéo)

CrossValidator bắt đầu bằng cách chia tập dữ liệu thành một tập hợp các folds được sử dụng như các tập huấn luyện và kiểm tra riêng biệt. Ví dụ, với folds, CrossValidator sẽ tạo ra 3 cặp (huấn luyện, kiểm tra), mỗi cặp sử dụng 2/3 dữ liệu để huấn luyện và 1/3 để kiểm tra. Mỗi fold được sử dụng làm tập kiểm tra đúng một lần.

Để minh họa, giả sử chúng ta có một Pipeline với các bước:

  • Tokenizer: Chuyển đổi văn bản thành các từ.
  • HashingTF: Chuyển đổi các từ thành vector đặc trưng.
  • LogisticRegression: Mô hình hồi quy logistic.

Chúng ta có thể sử dụng ParamGridBuilder để xây dựng lưới tham số cho HashingTFLogisticRegression, sau đó sử dụng CrossValidator để tìm tập tham số tốt nhất dựa trên chỉ số đánh giá.

from pyspark.sql import SparkSession

from pyspark.ml import Pipeline

from pyspark.ml.classification import LogisticRegression

from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.feature import HashingTF, Tokenizer

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

  

spark = SparkSession.builder.appName("Cross-Validation Example").getOrCreate()

  
  

# Prepare training documents, which are labeled.

training = spark.createDataFrame([

    (0, "a b c d e spark", 1.0),

    (1, "b d", 0.0),

    (2, "spark f g h", 1.0),

    (3, "hadoop mapreduce", 0.0),

    (4, "b spark who", 1.0),

    (5, "g d a y", 0.0),

    (6, "spark fly", 1.0),

    (7, "was mapreduce", 0.0),

    (8, "e spark program", 1.0),

    (9, "a e c l", 0.0),

    (10, "spark compile", 1.0),

    (11, "hadoop software", 0.0)

], ["id", "text", "label"])

  

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.

tokenizer = Tokenizer(inputCol="text", outputCol="words")

hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

lr = LogisticRegression(maxIter=10)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

  

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.

# This will allow us to jointly choose parameters for all Pipeline stages.

# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.

# We use a ParamGridBuilder to construct a grid of parameters to search over.

# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,

# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.

paramGrid = ParamGridBuilder() \

    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \

    .addGrid(lr.regParam, [0.1, 0.01]) \

    .build()

  

crossval = CrossValidator(estimator=pipeline,

                          estimatorParamMaps=paramGrid,

                          evaluator=BinaryClassificationEvaluator(),

                          numFolds=2)  # use 3+ folds in practice

  

# Run cross-validation, and choose the best set of parameters.

cvModel = crossval.fit(training)

  

# Prepare test documents, which are unlabeled.

test = spark.createDataFrame([

    (4, "spark i j k"),

    (5, "l m n"),

    (6, "mapreduce spark"),

    (7, "apache hadoop")

], ["id", "text"])

  

# Make predictions on test documents. cvModel uses the best model found (lrModel).

prediction = cvModel.transform(test)

selected = prediction.select("id", "text", "probability", "prediction")

for row in selected.collect():

    print(row)

Output: Pasted image 20250320213106.png

Train-Validation Split (Chia tập Huấn luyện-Kiểm tra)

TrainValidationSplit là một phương pháp khác để điều chỉnh siêu tham số, hoạt động tương tự như CrossValidator, nhưng chỉ chia tập dữ liệu một lần thành tập huấn luyện và kiểm tra. Phương pháp này thường nhanh hơn CrossValidator, nhưng có thể ít chính xác hơn do chỉ sử dụng một lần chia dữ liệu.

Ví dụ, chúng ta có thể sử dụng TrainValidationSplit với cùng Pipeline và lưới tham số như trên, nhưng chỉ chia tập dữ liệu một lần và tìm tập tham số tốt nhất dựa trên chỉ số đánh giá.

Cả CrossValidatorTrainValidationSplit đều trả về một mô hình đã được huấn luyện với tập tham số tốt nhất, có thể được sử dụng để dự đoán trên dữ liệu mới.

from pyspark.sql import SparkSession

from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.regression import LinearRegression

from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

  

spark = SparkSession.builder.appName("Train-Validation split Example").getOrCreate()

  

# Prepare training and test data.

data = spark.read.format("libsvm")\

    .load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_linear_regression_data.txt")

train, test = data.randomSplit([0.9, 0.1], seed=12345)

  
  
  

lr = LinearRegression(maxIter=10)

  

# We use a ParamGridBuilder to construct a grid of parameters to search over.

# TrainValidationSplit will try all combinations of values and determine best model using

# the evaluator.

paramGrid = ParamGridBuilder()\

    .addGrid(lr.regParam, [0.1, 0.01]) \

    .addGrid(lr.fitIntercept, [False, True])\

    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\

    .build()

  

# In this case the estimator is simply the linear regression.

# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.

tvs = TrainValidationSplit(estimator=lr,

                           estimatorParamMaps=paramGrid,

                           evaluator=RegressionEvaluator(),

                           # 80% of the data will be used for training, 20% for validation.

                           trainRatio=0.8)

  

# Run TrainValidationSplit, and choose the best set of parameters.

model = tvs.fit(train)

  

# Make predictions on test data. model is the model with combination of parameters

# that performed best.

model.transform(test)\

    .select("features", "label", "prediction")\

    .show()

Output: Pasted image 20250320213846.png


IV. Advanced topics

Tối ưu hóa các phương pháp tuyến tính (dành cho nhà phát triển)

1.L-BFGS (Limited-memory Broyden–Fletcher–Goldfarb–Shanno)

L-BFGS là một thuật toán tối ưu hóa thuộc họ các phương pháp quasi-Newton, được sử dụng để giải các bài toán tối ưu hóa dạng:

Phương pháp L-BFGS xấp xỉ hàm mục tiêu cục bộ như một hàm bậc hai mà không cần tính toán ma trận Hessian (ma trận đạo hàm bậc hai). Thay vào đó, ma trận Hessian được xấp xỉ bằng các đánh giá gradient trước đó, giúp tránh vấn đề về khả năng mở rộng theo chiều dọc khi số lượng đặc trưng lớn. Do đó, L-BFGS thường đạt được hội tụ nhanh hơn so với các phương pháp tối ưu hóa bậc nhất khác.

OWL-QN (Orthant-Wise Limited-memory Quasi-Newton) là một mở rộng của L-BFGS, có thể xử lý hiệu quả việc regularization L1 và elastic net.

Trong MLlib, L-BFGS được sử dụng làm solver cho các thuật toán như LinearRegression, LogisticRegression, AFTSurvivalRegressionMultilayerPerceptronClassifier.

2.Trình giải phương trình chuẩn cho bình phương tối thiểu có trọng số

MLlib triển khai trình giải phương trình chuẩn cho bình phương tối thiểu có trọng số thông qua lớp WeightedLeastSquares.

Với quan sát có trọng số , trong đó:

  • là trọng số của quan sát thứ
  • là vector đặc trưng của quan sát thứ
  • là nhãn của quan sát thứ

Số lượng đặc trưng cho mỗi quan sát là . Chúng ta sử dụng công thức bình phương tối thiểu có trọng số như sau:

trong đó:

  • là tham số regularization
  • là tham số trộn elastic-net
  • là độ lệch chuẩn của nhãn
  • là độ lệch chuẩn của cột đặc trưng thứ

Hàm mục tiêu này chỉ yêu cầu một lần duyệt qua dữ liệu để thu thập các thống kê cần thiết. Đối với ma trận dữ liệu kích thước , các thống kê này chỉ yêu cầu lưu trữ , do đó có thể được lưu trữ trên một máy đơn lẻ khi (số lượng đặc trưng) tương đối nhỏ. Sau đó, chúng ta có thể giải các phương trình chuẩn trên một máy đơn lẻ bằng các phương pháp như phân tích Cholesky trực tiếp hoặc các chương trình tối ưu hóa lặp.

Hiện tại, Spark MLlib hỗ trợ hai loại solver cho các phương trình chuẩn: phân tích Cholesky và các phương pháp Quasi-Newton (L-BFGS/OWL-QN). Phân tích Cholesky phụ thuộc vào ma trận hiệp phương sai xác định dương (tức là các cột của ma trận dữ liệu phải độc lập tuyến tính) và sẽ thất bại nếu điều kiện này bị vi phạm.